package gu.simplemq.mqtt;

import java.util.Properties;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;

import com.google.common.base.Strings;

import gu.simplemq.pool.BaseMQPool;
import gu.simplemq.utils.IntrospectionSupport;
/**
 * 延迟初始化的 {@link MqttClient} 资源池(线程安全)<br>
 * @author guyadong
 *
 */
public class MqttPoolLazy extends BaseMQPool<MqttClient> implements MqttConstants{
	private String serverURI;
	private String clientId;
	private int qos = DEFAULT_QOS;
	private final MqttConnectOptions connOpts = new MqttConnectOptions();
	private volatile MqttClient mqttClient;
	private long timeTowaitInMillis = -1;

	/**
	 * 构造方法，用于{@link gu.simplemq.pool.NamedMQPools#createPool(Properties)}反射调用创建实例
	 * 
	 * @param props
	 */
	public MqttPoolLazy (Properties props) {
		super(PropertiesHelper.MHELPER.initParameters(props), PropertiesHelper.MHELPER.getLocationlURI(props));
		String timeTowait = properties.getProperty(CONNOPTS_PREFIX+"timeTowaitInMillis");
		if(null!=timeTowait) {
			try {
				timeTowaitInMillis = Long.parseLong(timeTowait);
			} catch (Exception e) {
				// DO NOTHING
			}
		}
		IntrospectionSupport.setProperties(this, properties, false);
		IntrospectionSupport.setProperties(connOpts, properties, CONNOPTS_PREFIX);
		// 设置自动重连接
		connOpts.setAutomaticReconnect(true);
		Properties sslProperties = new Properties();
		for(String key:properties.stringPropertyNames()) {
			if(key.startsWith("com.ibm.ssl.")) {
				sslProperties.setProperty(key, properties.getProperty(key));
			}
		}
		if(!sslProperties.isEmpty()) {
			// 设置 SSL 属性
			connOpts.setSSLProperties(sslProperties);
			connOpts.setHttpsHostnameVerificationEnabled(false);
		}
	}
	
	public MqttPoolLazy setServerURI(String serverURI) {
		this.serverURI = serverURI;
		return this;
	}

	public MqttPoolLazy setClientId(String clientId) {
		this.clientId = clientId;
		return this;
	}

	/**
	 * @since 2.4.0
	 */
	public MqttConnectOptions getConnOpts() {
		return connOpts;
	}

	public int getQos() {
		return qos;
	}

	public void setQos(int qos) {
		this.qos = qos;
	}

	/**
	 * @see gu.simplemq.pool.BaseMQPool#borrow()
	 * @throws MqttPoolException 
	 */
	@Override
	public MqttClient borrow()throws MQPoolException{
		// double-checked locking
		if(mqttClient == null){
			synchronized (this) {
				if(mqttClient == null){
			        try {
			    		mqttClient = new MqttClient(
			    				serverURI, 
			    				Strings.isNullOrEmpty(clientId) ? MqttClient.generateClientId() : clientId, 
			    				null);
			    		if(timeTowaitInMillis>0) {
			    			mqttClient.setTimeToWait(timeTowaitInMillis);
			    		}
			    		mqttClient.connect(connOpts);
			    		logger.info("mqtt client initialized(MQTT客户端初始化)  {} ",getCanonicalURI());
					} catch (MqttException e) {
						throw new MQPoolException(e);
					}
				}
			}
		}
    	return mqttClient;
    }
    
	@Override
    public void release(MqttClient r) {
		// DO NOTHING
    }
	@Override
	public void close(){
		// double check
		if(mqttClient != null){
			synchronized (this) {
				if(mqttClient != null){
					try {
						logger.info("discard mqtt client: {}",this);
						// 先要断开连接才能关闭client
						if(mqttClient.isConnected()){
							mqttClient.disconnect();
						}
						mqttClient.close();
						mqttClient = null;
						closed = true;
					} catch (MqttException e) {
						throw new MQPoolException(e);
					}
				}				
			}
		}
	}
}
